Skip to content

fix: pin cache store workers to local device#1116

Open
Vinkle-hzt wants to merge 10 commits into
alibaba:mainfrom
Vinkle-hzt:fix/pin_device
Open

fix: pin cache store workers to local device#1116
Vinkle-hzt wants to merge 10 commits into
alibaba:mainfrom
Vinkle-hzt:fix/pin_device

Conversation

@Vinkle-hzt

Copy link
Copy Markdown
Collaborator

Summary

Fix cache store background workers to run on the correct local GPU device.

Cache store work can run from background thread pools and RPC callbacks. These threads do not always inherit the expected CUDA/HIP current device, which can lead to cache store operations using the wrong device in multi-GPU deployments. This PR passes the local rank device id into cache store components and pins worker threads before they execute device-sensitive work.

Changes

  • Add pinThreadToDeviceOnce() utility for CUDA/ROCm device pinning.
  • Propagate local_rank into cache store init params and async cache writer.
  • Pin cache store readiness, store, load, TCP load callback, and async writer tasks to the configured local device.
  • Keep device pinning no-op for invalid device ids and non-GPU builds.

Testing

Not run locally.

@Vinkle-hzt Vinkle-hzt requested a review from LLLLKKKK as a code owner June 17, 2026 10:16
@LLLLKKKK

Copy link
Copy Markdown
Collaborator

AI Code Review - PR #1116

Status: LGTM

Summary: P0/0 · P1/0 · P2/2 · P3/0

lgtm ready to ci

Non-blocking Suggestions

P2

  • 设备切换失败后仍继续执行后台拷贝 @ rtp_llm/cpp/utils/DevicePin.h:26
    • 建议:将 pin 失败改为 fail-fast,或让 helper 返回 bool/Status 并让调用方终止该后台任务并返回明确错误。
  • 缺少非默认 GPU pin 的回归覆盖 @ rtp_llm/models_py/bindings/core/CacheStoreAsyncWriter.cc:46
    • 建议:补充 CUDA/ROCm gated 或可 mock 的单测,构造非默认 device_id,并在后台任务内断言 current device 正确;必要时覆盖 Tcp load 的 GPU copy 路径。

Checklist Violations (6 fail / 34 total)

General Principles Checklist

  • [6.1] Architecture — 状态不变量:创建/更新/失败/重试/回滚路径有效 → issue 设备切换失败后仍继续执行后台拷贝
    设备 pin 失败后 helper 直接返回,调用方状态仍视为已 pin 并继续执行后台拷贝。
  • [6.1] Architecture — 错误语义:fail-fast/retry/fallback/silent 行为显式 → issue 设备切换失败后仍继续执行后台拷贝
    cudaSetDevice/hipSetDevice 失败是配置或设备状态错误,当前仅 WARNING 后静默 fallback 到线程原有 device。
  • [6.1] Tests — 新逻辑有聚焦单测 + 相关集成/smoke 测试 → issue 缺少非默认 GPU pin 的回归覆盖
    diff_paths 没有测试变更,现有 async writer 测试只走 device_id=-1 默认路径。
  • [6.1] Tests — 分布式/跨平台变更有对应覆盖 → issue 缺少非默认 GPU pin 的回归覆盖
    改动同时涉及 CUDA/ROCm 和多卡 local_rank 语义,但没有看到对应平台 gated 验证。

RTP-LLM Checklist

  • [F] 跨平台 — ROCm 路径错误处理非静默 → issue 设备切换失败后仍继续执行后台拷贝
    hipSetDevice 失败只 WARNING 后 return,后续任务继续执行。
  • [H] 测试与 CI — 测试覆盖充分:大重构等价覆盖,新功能端到端测试 → issue 缺少非默认 GPU pin 的回归覆盖
    新增后台线程 device pin 行为没有随 PR 增加测试覆盖。

Strengths

  • device_id 从 RemoteRpcServer、CacheStore/Messager 到 CacheStoreAsyncWriter 的传播链较完整,默认值 -1 也保留了旧调用路径兼容性。

@LLLLKKKK

Copy link
Copy Markdown
Collaborator

AI Code Review - PR #1116

Status: BLOCKING

Summary: P0/0 · P1/1 · P2/1 · P3/0

Blocking Issues

P1

  • TCP 服务端/transfer 回调仍未绑定 device @ rtp_llm/cpp/disaggregate/cache_store/TcpMessager.cpp:25
    • 建议:将 device_id 继续传入 TcpCacheStoreServiceImpl/TcpTransferConnection/TcpBlockReadClosure,并在 blockReadImpl 与 TcpBlockReadClosure::Run 的 GPU copy 前调用 pinThreadToDeviceOnce。

Non-blocking Suggestions

P2

  • 测试覆盖缺口:device pin 的有效设备路径未验证 @ rtp_llm/cpp/utils/DevicePin.h:15
    • 建议:补充 CUDA/ROCm 条件测试:用有效 local_rank 创建 writer 或 cache store 任务,并在后台线程断言当前 device 正确。

Checklist Violations (3 fail / 46 total)

General Principles Checklist

  • [6.1] Architecture — 状态不变量:创建/更新/失败/重试/回滚路径有效 → issue TCP 服务端/transfer 回调仍未绑定 device
    普通 load closure 已 pin,但 TCP 服务端和 transfer/blockRead 的 RPC 回调线程仍直接执行 GPU copy,device 状态不变量未覆盖所有异步入口。
  • [6.1] Tests — 新逻辑有聚焦单测 + 相关集成/smoke 测试 → issue 测试覆盖缺口:device pin 的有效设备路径未验证
    新增有效 device 绑定路径未见测试覆盖,现有 CacheStoreAsyncWriterTest 默认 device_id=-1 会绕过 pinThreadToDeviceOnce。
  • [6.1] Tests — 分布式/跨平台变更有对应覆盖 → checklist-only
    变更涉及 CUDA/ROCm 和多 local_rank 后台线程,但 diff 未增加对应覆盖;主要风险已由 device pin 遗漏和测试 issue 覆盖。

Strengths

  • 新增 device_id 默认值为 -1,旧测试和未配置路径可以保持原行为。
  • 新增 DevicePin 将后台线程 device 选择集中到统一工具函数,调用点复用同一实现。

@LLLLKKKK

Copy link
Copy Markdown
Collaborator

AI Code Review - PR #1116

Status: BLOCKING

Summary: P0/0 · P1/1 · P2/1 · P3/0

Blocking Issues

P1

  • 设备 pin 异常会绕过异步 writer 的异常收集 @ rtp_llm/models_py/bindings/core/CacheStoreAsyncWriter.cc:47
    • 建议:把 pinThreadToDeviceOnce 纳入 try/catch,并用 RAII/scope guard 保证 pending_count_-- 与 wait_cv_ notify 在所有退出路径执行,再通过 stored_exception_ 传播异常。

Non-blocking Suggestions

P2

  • 缺少非 0 device 的后台 cache store 覆盖 @ rtp_llm/cpp/utils/DevicePin.h:15
    • 建议:增加带 device_id=local_rank 的多 GPU guarded UT 或集成用例,验证后台 store/load/read closure 在非 0 device 上读写正确。

Checklist Violations (6 fail / 38 total)

General Principles Checklist

  • [6.1] Architecture — 状态不变量:创建/更新/失败/重试/回滚路径有效 → issue 设备 pin 异常会绕过异步 writer 的异常收集
    CacheStoreAsyncWriter.cc:47 的 pinThreadToDeviceOnce 在 try/catch 外执行,失败时不会维护 pending_count/stored_exception_ 不变量。_
  • [6.1] Architecture — 错误语义:fail-fast/retry/fallback/silent 行为显式 → issue 设备 pin 异常会绕过异步 writer 的异常收集
    async writer 已设计为捕获后台异常并在 waitAllDone 重抛,但新增设备 pin 失败会绕过该异常传播语义。
  • [6.1] Tests — 新逻辑有聚焦单测 + 相关集成/smoke 测试 → issue 缺少非 0 device 的后台 cache store 覆盖
    diff 未包含测试;现有 closure/messager 相关测试仍主要走默认 device_id=-1 或 device 0,未覆盖 device pin 生效或失败路径。
  • [6.1] Tests — 分布式/跨平台变更有对应覆盖 → issue 缺少非 0 device 的后台 cache store 覆盖
    变更包含 CUDA/ROCm 分支和多卡后台线程设备绑定,但 diff 未新增非 0 local_rank 或跨平台 guarded 覆盖。

RTP-LLM Checklist

  • [C] 线程安全与并发 — Worker 线程异常未传播到调用方 → issue 设备 pin 异常会绕过异步 writer 的异常收集
    CacheStoreAsyncWriter.cc:47 的 pinThreadToDeviceOnce 在 try 外,异常不会进入 stored_exception,也不会减少 pending_count_。_
  • [H] 测试与 CI — 测试覆盖充分:大重构等价覆盖,新功能端到端测试 → issue 缺少非 0 device 的后台 cache store 覆盖
    新增非 0 device 绑定路径未见端到端或 guarded UT 覆盖。

Strengths

  • device_id 从 RemoteRpcServer/PyWrappedModel 贯通到 cache store、TCP closure 和 async writer,主要后台线程入口都有显式 pin。

@LLLLKKKK

Copy link
Copy Markdown
Collaborator

AI Code Review - PR #1116

Status: LGTM

Summary: P0/0 · P1/0 · P2/1 · P3/0

lgtm ready to ci

Non-blocking Suggestions

P2

  • 缺少 device pinning 的有效覆盖 @ rtp_llm/cpp/utils/DevicePin.h:25
    • 建议:补充可在 GPU 环境运行的 device_id>=0 测试,或抽象 set-device 钩子做单测覆盖 CUDA/ROCm 分支与异常路径。

Checklist Violations (3 fail / 82 total)

General Principles Checklist

  • [6.1] Tests — 新逻辑有聚焦单测 + 相关集成/smoke 测试 → issue 缺少 device pinning 的有效覆盖
    新增 CUDA/ROCm device pinning,但测试没有覆盖 device_id>=0 的后台线程行为。
  • [6.1] Tests — 分布式/跨平台变更有对应覆盖 → issue 缺少 device pinning 的有效覆盖
    DevicePin 新增 CUDA/ROCm 分支,但未见覆盖 device_id>=0 的 CUDA/ROCm 行为测试。

RTP-LLM Checklist

  • [H] 测试与 CI — 测试覆盖充分:大重构等价覆盖,新功能端到端测试 → issue 缺少 device pinning 的有效覆盖
    device pinning 是新运行时行为,但没有覆盖 device_id>=0 的 CUDA/ROCm 后台线程测试。

Strengths

  • device_id 从 RemoteRpcServer/PyWrappedModel 传递到 cache store、Messager、TCP closure 和 CacheStoreAsyncWriter,后台 cache store 路径的设备语义更明确。
  • PendingTaskGuard 让后台任务抛异常时也能递减 pending_count_,降低 waitAllDone 永久等待风险。

@wht21

wht21 commented Jun 22, 2026

Copy link
Copy Markdown
Collaborator

internal source has been updated, please review the changes!

1 similar comment
@wht21

wht21 commented Jun 22, 2026

Copy link
Copy Markdown
Collaborator

internal source has been updated, please review the changes!

@Vinkle-hzt Vinkle-hzt enabled auto-merge (squash) June 23, 2026 01:59
@LLLLKKKK

Copy link
Copy Markdown
Collaborator

AI Code Review - PR #1116

Status: LGTM

Summary: P0/0 · P1/0 · P2/0 · P3/2

lgtm ready to ci

Non-blocking Suggestions

P3

  • PyWrappedModel.h 中 int64_t 到 int 的隐式窄化转换 @ rtp_llm/cpp/models/PyWrappedModel.h:275
    • 建议:与 RemoteRpcServer.cc 保持一致,加上 static_cast:CacheStoreAsyncWriter(static_cast<int>(params.parallelism_config.local_rank))
  • InitParams.h 文件末尾仍缺少换行符 @ rtp_llm/cpp/disaggregate/cache_store/InitParams.h:53
    • 建议:在文件末尾 } // namespace rtp_llm 后添加换行符,与 PR 中其他文件的清理保持一致。

Checklist ✅ (39 items passed)

Strengths

  • PendingTaskGuard RAII 模式替代手动 fetch_sub,确保异常路径下 pending_count_ 始终正确递减,提升了代码健壮性
  • pinThreadToDeviceOnce 使用 thread_local 缓存已 pin 设备,避免重复 cudaSetDevice 调用,对线程池场景高效
  • device_id 从 RemoteRpcServer/PyWrappedModel 到 NormalCacheStore、TcpMessager、TcpCacheStoreServiceImpl、TcpTransferConnection、Closure 的传播路径完整一致
  • 所有新增 device_id 参数均默认 -1(不 pin),保持完全向后兼容

@wht21

wht21 commented Jun 25, 2026

Copy link
Copy Markdown
Collaborator

internal source has been updated, please review the changes!

1 similar comment
@wht21

wht21 commented Jun 25, 2026

Copy link
Copy Markdown
Collaborator

internal source has been updated, please review the changes!

@LLLLKKKK

Copy link
Copy Markdown
Collaborator

AI Code Review - PR #1116

Status: LGTM

Summary: P0/0 · P1/0 · P2/1 · P3/1

lgtm ready to ci

Non-blocking Suggestions

P2

  • DevicePin.h 新工具函数无测试覆盖 @ rtp_llm/cpp/utils/DevicePin.h:15
    • 建议:添加 CPU 侧单测验证 thread_local 缓存逻辑(device_id=-1 时 early return、相同 device_id 重复调用不重入);CacheStoreAsyncWriterTest 中新增传入特定 device_id 值的用例,断言 writer.device_id_ 存储正确。GPU 侧行为通过 smoke 测试覆盖。

P3

  • PyWrappedModel.h 隐式窄化转换 int64_t → int @ rtp_llm/cpp/models/PyWrappedModel.h:275
    • 建议:与 RemoteRpcServer.cc 保持一致,使用 static_cast(params.parallelism_config.local_rank)。

Checklist Violations (1 fail / 55 total)

General Principles Checklist

  • [6.1] Tests — 新逻辑有聚焦单测 + 相关集成/smoke 测试 → issue DevicePin.h 新工具函数无测试覆盖
    DevicePin.h pinThreadToDeviceOnce 核心工具函数无独立单元测试。CacheStoreAsyncWriterTest 所有用例使用 device_id=-1 跳过 pinning 逻辑,device_id 参数传播未被验证。

Strengths

  • PendingTaskGuard RAII 模式确保异常路径下 pending_count_ 正确递减,同时修复了 pushTask 失败路径下 wait_cv_ 未 notify 的潜在竞态
  • thread_local 缓存使重复 pinThreadToDeviceOnce 调用接近零开销,且跨平台支持 CUDA/ROCm/CPU no-op
  • 所有新增构造函数参数均带 device_id = -1 默认值,保持完全向后兼容
  • device_id 从 RemoteRpcServer/PyWrappedModel 经 CacheStoreInitParams → MessagerInitParams → 各组件完整传播,覆盖所有 worker 线程入口

@wht21

wht21 commented Jun 29, 2026

Copy link
Copy Markdown
Collaborator

internal source has been updated, please review the changes!

@LLLLKKKK

Copy link
Copy Markdown
Collaborator

AI Code Review - PR #1116

Status: LGTM

Summary: P0/0 · P1/0 · P2/1 · P3/0

lgtm ready to ci

Non-blocking Suggestions

P2

  • PyWrappedModel 中 local_rank (int64_t) 传给 CacheStoreAsyncWriter(int) 缺少显式转型 @ rtp_llm/cpp/models/PyWrappedModel.h:275
    • 建议:与 RemoteRpcServer.cc 保持一致,改为 static_cast(params.parallelism_config.local_rank)。

Checklist Violations (3 fail / 39 total)

General Principles Checklist

  • [6.1] Tests — 新逻辑有聚焦单测 + 相关集成/smoke 测试 → checklist-only
    CacheStoreAsyncWriterTest.cpp 所有用例使用 device_id=-1(默认构造),pinThreadToDeviceOnce 代码路径未被直接测试覆盖。仅新增了 pending_count 断言。不升级为 issue 因为 pinThreadToDeviceOnce 是薄 CUDA 包装,需 GPU 环境验证,由集成测试间接覆盖。
  • [6.1] Tests — 分布式/跨平台变更有对应覆盖 → checklist-only
    DevicePin.h 新增 ROCm 分支(at::hip::set_device),但 ROCm 路径无法在 CUDA-only CI 上编译验证。不升级为 issue 因为此为平台硬件约束而非代码缺陷。

RTP-LLM Checklist

  • [F] 跨平台 — CUDA/ROCm 两侧 binding 对称清理 → checklist-only
    ROCm 分支使用 at::hip::set_device 在代码库中首次出现,现有 ROCm 设备设置均通过 cudaSetDevice(经 cuda_shims.h 映射为 hipSetDevice)完成。不升级为 issue 因为 at::hip::set_device 功能等价,但建议在 ROCm CI 中验证编译通过。

Strengths

  • PendingTaskGuard RAII 确保 pending_count 在所有退出路径(含异常)下正确递减,同时抽取 completePendingTask/storeCurrentException 消除重复逻辑
  • pinThreadToDeviceOnce 使用 thread_local 缓存避免重复 cudaSetDevice 调用,且通过 #if USING_CUDA/#elif USING_ROCM 干净处理双平台
  • device_id 从 local_rank 经 CacheStoreInitParams → NormalCacheStore → MessagerInitParams → TcpMessager → TcpCacheStoreServiceImpl → TcpTransferConnection 完整传播,覆盖所有 worker 线程入口
  • 所有新增参数均有 device_id = -1 安全默认值,现有调用方无需修改即可正常工作

@wht21

wht21 commented Jun 29, 2026

Copy link
Copy Markdown
Collaborator

internal source has been updated, please review the changes!

@LLLLKKKK

Copy link
Copy Markdown
Collaborator

AI Code Review - PR #1116

Status: BLOCKING

Summary: P0/0 · P1/0 · P2/0 · P3/2

Non-blocking Suggestions

P3

  • CPU-only 构建下 pinThreadToDeviceOnce 仍更新 thread_local 状态 @ rtp_llm/cpp/utils/DevicePin.h:28
    • 建议:可在 #if USING_CUDA || USING_ROCM 条件内包裹整个 thread_local 检查和赋值逻辑,使 CPU-only 构建下函数为纯空操作,语义更清晰。这是极低优先级的改进。
  • pinThreadToDeviceOnce 未处理 set_device 失败场景 @ rtp_llm/cpp/utils/DevicePin.h:25
    • 建议:实际场景中 device_id 来自 local_rank,几乎不可能无效,当前无需修改。如果未来有动态设备管理需求,可考虑在调用点添加 try-catch 并记录错误日志。

Checklist Violations (2 fail / 25 total)

General Principles Checklist

  • [6.1] Architecture — 错误语义:fail-fast/retry/fallback/silent 行为显式 → issue pinThreadToDeviceOnce 未处理 set_device 失败场景
    at::cuda::set_device() 在设备 ID 无效时会通过 C10_CUDA_CHECK 抛出异常。在 NormalCacheStore::initcheck_task_readiness lambda 中调用 pinThreadToDeviceOnce,该 lambda 无 try-catch 包裹,若 set_device 失败线程将直接终止。不过当前实现中 pinned_device 赋值在 set_device 之后,因此失败时不会错误标记为已绑定,下次调用会重试,行为合理。
  • [6.1] Software Engineering — KISS/YAGNI:无投机性抽象 → issue CPU-only 构建下 pinThreadToDeviceOnce 仍更新 thread_local 状态
    USING_CUDAUSING_ROCM 均未定义时,pinThreadToDeviceOnce 仍会执行 pinned_device = device_id 赋值,但实际未调用任何设备 API。虽然无害,但语义上不够精确——在 CPU-only 构建中 pinned_device 变量永远不会被真正使用。

Strengths

  • 设备绑定逻辑集中在单一工具函数 pinThreadToDeviceOnce 中,所有调用点统一使用,避免重复实现。
  • 所有新增参数均使用 device_id = -1 默认值,确保向后兼容——未设置时 pinThreadToDeviceOnce 为空操作,既有调用方无需修改。
  • CacheStoreAsyncWriterPendingTaskGuard RAII 重构消除了原先手动管理 pending_count_ 递减的异常不安全风险,代码更清晰。
  • 设备绑定覆盖完整:store 任务、load 任务、check_task_readiness 循环、TCP block read 闭包、load service 闭包、blockReadImpl 服务端处理、CacheStoreAsyncWriter 后台任务均已覆盖。
  • 测试增加了 pending_count_ 异常后归零的断言,验证 RAII guard 的正确性。

@LLLLKKKK

Copy link
Copy Markdown
Collaborator

AI Code Review - PR #1116

Status: BLOCKING

Summary: P0/0 · P1/0 · P2/3 · P3/0

Non-blocking Suggestions

P2

  • pinThreadToDeviceOnce 不处理同一线程被不同 device_id 调用的场景 @ rtp_llm/cpp/utils/DevicePin.h:22
    • 建议:建议在 pinned_device >= 0 && pinned_device != device_id 时至少打一条 WARNING 日志,帮助排查多 device 环境下的潜在问题。或者如果确认不应切换 device,则直接 return 不调用 set_device。
  • blockReadImpl 中 torch::from_blob 使用硬编码 torch::kCUDA 未适配 ROCm @ rtp_llm/cpp/disaggregate/cache_store/TcpCacheStoreServiceImpl.cpp:121
    • 建议:可考虑后续将 torch::kCUDA 替换为项目中统一的设备类型宏或工具函数,使其在 ROCm 平台上语义更清晰。本 PR 范围内无需改动,标记为后续改进项。
  • CacheStoreAsyncWriter 缺少 device pinning 的单元测试覆盖 @ rtp_llm/models_py/bindings/core/test/CacheStoreAsyncWriterTest.cpp:96
    • 建议:建议添加一个测试用 CacheStoreAsyncWriter writer(0) 构造,验证在无 GPU 环境下 submit+waitAllDone 正常工作,确认 device pinning 路径不会在 CPU-only 构建中崩溃。

Checklist Violations (3 fail / 25 total)

General Principles Checklist

  • [6.1] Architecture — 状态不变量:创建/更新/失败/重试/回滚路径有效 → issue pinThreadToDeviceOnce 不处理同一线程被不同 device_id 调用的场景
    pinThreadToDeviceOnceif (pinned_device == device_id) return; 的逻辑意味着同一线程如果先被 device 0 pin、后又被 device 1 调用,会静默执行第二次 set_device。在当前 cache store 架构中,线程池的线程可能被多个 NormalCacheStore 实例复用(虽然目前通常只有一个实例)。函数名中的 "Once" 暗示只 pin 一次,但实际行为是 "pin 到最新 device"——如果这是预期行为,函数名可考虑调整以避免误解。
  • [6.1] Tests — 新逻辑有聚焦单测 + 相关集成/smoke 测试 → issue CacheStoreAsyncWriter 缺少 device pinning 的单元测试覆盖
    测试新增了 ASSERT_EQ(0, writer.pending_count_.load()) 验证 RAII guard 的计数器归零,这很好。但所有测试都使用默认 device_id=-1 构造 writer,没有任何测试覆盖传入有效 device_id 的场景(即使只是验证构造不崩溃)。PendingTaskGuardstoreCurrentException 的抽取是正确的重构,但 pinThreadToDeviceOnce 在无 GPU 环境下是 no-op,可以安全地用非负 device_id 构造 writer 来验证路径覆盖。

RTP-LLM Checklist

  • [I] 代码质量 — 同一功能用统一工具函数 → issue blockReadImpl 中 torch::from_blob 使用硬编码 torch::kCUDA 未适配 ROCm
    blockReadImpl 新增了 pinThreadToDeviceOnce(device_id_) 调用(支持 CUDA 和 ROCm),但函数体内 torch::from_blob 硬编码 torch::kCUDA。在 ROCm 构建下 pinThreadToDeviceOnce 会调用 at::hip::set_device,但 torch::kCUDA 在 ROCm 构建中虽然通常 alias 到 HIP,仍存在语义不一致的风险。这不是本 PR 引入的问题,但 pin device 后这段代码变得更敏感——如果 pin 了错误的设备或平台不匹配,操作会发生在错误的 GPU 上。

Strengths

  • 统一的 pinThreadToDeviceOnce 工具函数设计简洁:thread_local 缓存避免重复 set_device 调用,支持 CUDA/ROCm/CPU 三平台条件编译,且仅在成功后更新缓存保持 fail-fast 语义。
  • PendingTaskGuard RAII 重构消除了 CacheStoreAsyncWriter::submit 中手动管理 pending_count 递减的重复代码和异常安全风险,使正常路径和异常路径都保证计数器正确递减。
  • device_id 从 parallelism_config.local_rank 取值并通过初始化参数层层传递,传播路径清晰:RemoteRpcServer → CacheStoreInitParams → NormalCacheStore → MessagerInitParams → TcpMessager → TcpCacheStoreServiceImpl → 各 Closure
  • 新增测试验证 pending_count_ 在异常后归零,覆盖了本次 RAII 重构的关键不变量。

@LLLLKKKK

Copy link
Copy Markdown
Collaborator

AI Code Review - PR #1116

Status: BLOCKING

Summary: P0/0 · P1/0 · P2/2 · P3/0

Non-blocking Suggestions

P2

  • Closure 中 torch::kCUDA 硬编码与 getPinnedTorchDeviceType() 不一致 @ rtp_llm/cpp/disaggregate/cache_store/TcpBlockReadClosure.cpp:70
    • 建议:将 TcpBlockReadClosure.cpp:70TcpCacheStoreLoadServiceClosure.cpp:61 中的 torch::kCUDA 替换为 getPinnedTorchDeviceType(),与 blockReadImpl 保持一致:unload_block->gpu_mem ? getPinnedTorchDeviceType() : torch::kCPU
  • 新增测试未验证设备绑定实际生效 @ rtp_llm/models_py/bindings/core/test/CacheStoreAsyncWriterTest.cpp:89
    • 建议:如果 CI 有 GPU 资源,可在 task lambda 中调用 cudaGetDevice(&dev) 后断言 dev == 0,验证设备绑定确实生效。如果 CI 无 GPU,建议在测试中使用 device_id=-1(no-op 路径)或添加 GPU 可用性 guard,避免测试在 CPU-only 环境下非预期失败。

Checklist Violations (2 fail / 25 total)

General Principles Checklist

  • [6.1] Tests — 新逻辑有聚焦单测 + 相关集成/smoke 测试 → issue 新增测试未验证设备绑定实际生效
    AsyncExecutionWithDeviceId 测试传入 device_id=0 并验证任务执行完成和计数正确,但未断言 worker 线程的 CUDA 设备实际被设置为 0(例如通过 cudaGetDevice 检查)。该测试仅覆盖了"带 device_id 构造不崩溃"的场景,与已有的 AsyncExecution 测试功能高度重叠。在无 GPU 的 CI 环境中 setCurrentThreadDeviceIfNeeded 会因 device_id >= 0 尝试调用 at::cuda::set_device(0) — 如果 CI 节点无可用 GPU,测试可能会失败或被静默跳过。

RTP-LLM Checklist

  • [I] 代码质量 — 同一功能用统一工具函数 → issue Closure 中 torch::kCUDA 硬编码与 getPinnedTorchDeviceType() 不一致
    TcpCacheStoreServiceImpl::blockReadImpl 已将 torch::kCUDA 改为 torch::Device(getPinnedTorchDeviceType()),但 TcpBlockReadClosure::Run() 第 70 行和 TcpCacheStoreLoadServiceClosure::Run() 第 61 行仍使用 unload_block->gpu_mem ? torch::kCUDA : torch::kCPU 硬编码。这两个 Closure 同样被本 PR 修改并已接收 device_id_、已调用 setCurrentThreadDeviceIfNeeded,但其 tensor 创建方式未同步使用 getPinnedTorchDeviceType()。在 ROCm 平台上,torch::kCUDA 在 HIP 构建中不等价于 c10::DeviceType::HIP,可能导致设备类型不匹配。

Strengths

  • 设备绑定逻辑集中在 DevicePin.h 中,采用 thread_local 缓存避免重复调用 cudaSetDevice,兼顾了性能和正确性。
  • PendingTaskGuard RAII 模式确保 pending_count_ 在异常和正常路径下都能正确递减,比原来的手动 catch + decrement 更加健壮。
  • 设备 ID 从 parallelism_config.local_rank 起始、沿 CacheStoreInitParams → MessagerInitParams → 各组件 传播,链路清晰,默认值 -1 保证向后兼容。
  • blockReadImpl 中将硬编码 torch::kCUDA 替换为 getPinnedTorchDeviceType() 增强了跨平台支持。

@LLLLKKKK

Copy link
Copy Markdown
Collaborator

AI Code Review - PR #1116

Status: BLOCKING

Summary: P0/0 · P1/0 · P2/3 · P3/1

Non-blocking Suggestions

P2

  • AsyncExecutionWithDeviceId 测试不能有效验证 device pinning 逻辑 @ rtp_llm/models_py/bindings/core/test/CacheStoreAsyncWriterTest.cpp:131
    • 建议:建议在多 GPU CI 环境中添加 device_id 非零的测试用例。如果 CI 环境只有单 GPU,可以用 GTEST_SKIP 条件跳过。最低限度可以验证 setCurrentThreadDeviceIfNeeded 的 thread_local 缓存命中路径(相同 device_id 连续调用第二次应跳过 set_device)。
  • thread_local 缓存在外部代码改变 CUDA device 时可能过时 @ rtp_llm/cpp/utils/DevicePin.h:21
    • 建议:如果性能允许,可以用 cudaGetDevice 验证当前实际设备是否匹配再决定是否跳过,或者接受每次都调用 set_device 的开销(当设备已正确时调用成本很低)。当前实现在独立线程池场景下是安全的,此为低概率风险提示。
  • setCurrentThreadDeviceIfNeeded 缺少专用单元测试 @ rtp_llm/cpp/utils/DevicePin.h:15
    • 建议:在 rtp_llm/cpp/utils/ 下添加 DevicePinTest.cpp,至少覆盖 device_id < 0 的 no-op 路径和 device_id = 0 的基本调用路径。多 GPU 场景按环境条件跳过。

P3

  • 测试使用 #define private public 访问私有成员 @ rtp_llm/models_py/bindings/core/test/CacheStoreAsyncWriterTest.cpp:2
    • 建议:这是项目中的既有模式(此文件在 PR 之前已使用此宏),不阻塞本 PR。后续可考虑通过 friend 声明测试类或仅通过公有 API 断言来改进。

Checklist Violations (3 fail / 25 total)

General Principles Checklist

  • [6.1] Tests — 新逻辑有聚焦单测 + 相关集成/smoke 测试 → issue setCurrentThreadDeviceIfNeeded 缺少专用单元测试
    DevicePin.h 中的 setCurrentThreadDeviceIfNeeded 包含非平凡逻辑:thread_local 缓存跳过重复调用、负 device_id 提前返回、跨平台条件编译。整个仓库中没有专门的单元测试。唯一间接覆盖是 CacheStoreAsyncWriterTest::AsyncExecutionWithDeviceId,但如前述该测试只验证 device 0(CUDA 默认值)。未测试场景包括:(1) 相同 device_id 连续调用应跳过第二次 set_device(缓存命中),(2) 不同 device_id 应重新调用 set_device,(3) 无效 device_id 应传播异常且不更新缓存。
  • [6.1] Tests — 边界 case 覆盖(空、单元素、最大值) → issue AsyncExecutionWithDeviceId 测试不能有效验证 device pinning 逻辑
    测试构造 CacheStoreAsyncWriter(0) 并断言工作线程看到 device 0。但 CUDA 运行时保证新线程默认 device 就是 0,无论是否调用 setCurrentThreadDeviceIfNeeded,此断言都会通过。要有效验证 pinning 逻辑,需要在多 GPU 环境下:(1) 主线程设置为 device N(N>0),(2) writer 使用 device_id=M(M!=N),(3) 验证工作线程看到 device M 而非继承父线程的 device。当前测试是恒真命题,验证的是 CUDA 默认行为而非 pinning 代码。

RTP-LLM Checklist

  • [I] 代码质量 — 同一功能用统一工具函数 → issue 测试使用 #define private public 访问私有成员
    第 2 行 #define private public 在包含头文件前修改访问控制,这在 C++ 标准下是未定义行为(可能改变类的布局和 name mangling)。测试直接访问 state_pending_count_ 等私有成员。现有测试中多数公有 API 断言(counter 值、异常传播)已能验证正确性,部分私有成员检查是冗余的。

Strengths

  • DevicePin.h 设计简洁,thread_local 缓存避免重复系统调用,fail-fast 语义正确(set_device 异常时不更新缓存,下次调用会重试)。
  • PendingTaskGuard RAII 重构消除了手动 pending_count_ 递减的多路径重复,异常路径和正常路径统一处理,降低了遗漏递减的风险。
  • device_id 传播链完整覆盖了所有 TCP 路径的 GPU 操作站点(NormalCacheStore 3 个 lambda、TcpCacheStoreServiceImpl::blockReadImpl、TcpBlockReadClosure::Run、TcpCacheStoreLoadServiceClosure::Run)。
  • RDMA 路径未添加 pinning 是合理的,因为 RDMA 数据传输绑定为直接内存操作,不涉及 CUDA API。
  • 使用 at::cuda::set_device 而非原始 cudaSetDevice 是正确选择,确保 PyTorch 内部的 per-thread stream/cublas handle 跟踪同步更新。
  • 新增测试覆盖了 GPU pinning 的基本路径和异常后 pending_count 归零的断言。

@LLLLKKKK

Copy link
Copy Markdown
Collaborator

AI Code Review - PR #1116

Status: BLOCKING

Summary: P0/0 · P1/0 · P2/1 · P3/1

Non-blocking Suggestions

P2

  • getNoBlockCopyStream 为 thread_local 且绑定首次创建时的设备 @ rtp_llm/cpp/disaggregate/cache_store/TcpBlockReadClosure.cpp:33
    • 建议:考虑在 execNoBlockCopy(CopyParams 重载)中检测 current device 与 stream 所属 device 不一致时重新获取 stream,或者在 DevicePin 文档中注明此已知限制。非阻塞优先级较低,可后续优化。

P3

  • DevicePin 仅调用 at::cuda::set_device 而未设置 CUDA stream @ rtp_llm/cpp/utils/DevicePin.h:49
    • 建议:在 DevicePin.h 注释中明确说明此函数的适用范围(设备上下文固定,非完整 CUDA 运行时初始化),与 cudaPreRun 区分定位。

Checklist Violations (2 fail / 25 total)

General Principles Checklist

  • [6.1] Architecture — 状态不变量:创建/更新/失败/重试/回滚路径有效 → issue getNoBlockCopyStream 为 thread_local 且绑定首次创建时的设备
    getNoBlockCopyStream()static thread_local,在线程首次调用 execNoBlockCopy 时通过 at::cuda::getStreamFromPool 在当时的 current device 上创建并永久缓存。若线程池工作线程首次服务 device 0 后被重定向到 device 1(通过 setCurrentThreadDeviceIfNeeded),后续 execNoBlockCopy 仍会使用 device 0 上的 stream 执行 memcpy。cudaMemcpyDefault 使得拷贝功能正确,但跨设备 stream 可能导致次优的 DMA 路径选择和调度延迟。此为预存问题但本 PR 放大了触发概率(多设备重定向场景)。

RTP-LLM Checklist

  • [I] 代码质量 — 同一功能用统一工具函数 → issue DevicePin 仅调用 at::cuda::set_device 而未设置 CUDA stream
    现有 cudaPreRun(ExecOps.cc:328)做三步:cudaSetDevice + at::cuda::set_device + at::cuda::setCurrentCUDAStream。DevicePin.h 仅做 at::cuda::set_device(内部包含 cudaSetDevice)。对于 cache store 场景(仅需正确设备上下文用于 torch::from_blobcudaMemcpyAsync),这已足够。但若未来有线程池任务需要使用 PyTorch 默认 stream 调度 kernel,可能需补充 stream 设置。

Strengths

  • 抽象层次恰当:setCurrentThreadDeviceIfNeeded 模板化 impl + thread_local 缓存设计简洁,避免重复 syscall 开销同时支持跨平台(CUDA/ROCm/CPU no-op)。
  • 设备 ID 传播路径完整:从 local_rankCacheStoreInitParamsMessagerInitParams → 各 Service/Closure/Connection/AsyncWriter,未遗漏调用点。
  • 测试设计优秀:DevicePinTest 覆盖负值跳过、缓存命中、重定向、异常不污染缓存四种场景;CacheStoreAsyncWriterTest::AsyncExecutionWithDeviceId 在多 GPU 环境验证端到端行为。
  • 容错设计:device_id < 0 时静默跳过,CPU-only 构建 no-op,异常时缓存不更新保证下次重试。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants